Skip to content

Time partition restrictions #1405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Aug 11, 2025

create stream with custom time partition
use header -
X-P-Time-Partition: - this has to be a timestamp field X-P-Time-Partition-Limit: max historical range default to 30d

server validates if all events in the batch has this time partition field if false, it rejects the whole batch
Fixes: #1400

Summary by CodeRabbit

  • New Features
    • Support creating streams with time-based partitioning and an explicit retention limit; enforces mutual exclusivity with custom partitions.
  • Refactor
    • Faster catalog updates and uploads via parallel, partition-aware processing and concurrent file handling.
    • Public exposure of select HTTP header keys for integration use.
    • Adjusted JSON flattening to better handle partitioned and non-partitioned inputs.
    • Ingestion blocked for time-partitioned streams; missing streams now return errors.
  • Bug Fixes
    • Prevented accidental overwrite of staging files by renaming on conflict.
  • Chores
    • Added parallelism dependency.

Copy link
Contributor

coderabbitai bot commented Aug 11, 2025

Walkthrough

Batch-oriented, partition-aware ingestion and catalog updates were introduced. Ingestion utilities gained a time_partition parameter, HTTP ingest now errors on time-partitioned streams, JSON flattening was refactored for partition-aware processing, object storage uploads were parallelized with manifest aggregation, and several internal behaviors (staging writer rename, ULID grouping, logging) were adjusted. Rayon added.

Changes

Cohort / File(s) Summary
Dependencies
Cargo.toml
Added rayon = "1.8" to dependencies.
Catalog batching & partitions
src/catalog/mod.rs
update_snapshot now accepts Vecmanifest::File, groups changes by time partitions using Rayon, processes per-partition updates, and appends new manifest items to snapshots. create_manifest adapted for batch semantics.
HTTP ingest handlers
src/handlers/http/ingest.rs
Stream retrieval errors now bubble up; ingestion blocked for time-partitioned streams; updated calls to flatten_and_push_logs with a new 5th parameter None; consistent propagation across OTEL paths.
Ingest utilities API
src/handlers/http/modal/utils/ingest_utils.rs
Added time_partition: Option<String> to flatten_and_push_logs and push_logs; unified conversion path always calling convert_array_to_object with partition args; propagated time partition across sources.
Handler constants visibility
src/handlers/mod.rs
Made several header/flag constants public (e.g., TIME_PARTITION_KEY, CUSTOM_PARTITION_KEY, AUTHORIZATION_KEY, etc.).
Stream create/update logic
src/parseable/mod.rs
Validates mutual exclusivity of time_partition and custom_partition; computes time_partition_in_days; passes time/custom partition info to create_stream.
Staging writer collision handling
src/parseable/staging/writer.rs
On existing target file, preserves prior file by renaming with random 8-char prefix before final rename; added rand utilities.
Arrow file grouping
src/parseable/streams.rs
Replaced mode-based node id with ulid::Ulid for grouping; removed helper/imports; adjusted tests and expectations.
Field stats logging & call updates
src/storage/field_stats.rs
Downgraded some warnings to trace; updated call to flatten_and_push_logs with new parameter.
Concurrent staging upload pipeline
src/storage/object_storage.rs
Introduced concurrent upload pipeline with bounded concurrency; added UploadContext and UploadResult; manifest aggregation and optional stats calculation; schema processing; path building with custom partition.
JSON flattening and partition-aware processing
src/utils/json/mod.rs
Added partition-aware processing paths; generic flattening decision helpers; refactored convert_array_to_object to return partitioned batches vs single-batch for non-partitioned; added tests.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant HTTP Ingest
  participant Stream Store
  participant Ingest Utils
  participant JSON Utils

  Client->>HTTP Ingest: POST /ingest
  HTTP Ingest->>Stream Store: get_stream(stream_name)
  Stream Store-->>HTTP Ingest: Stream or Error
  alt Stream has time_partition or Error
    HTTP Ingest-->>Client: PostError (disallowed or retrieval error)
  else Proceed
    HTTP Ingest->>Ingest Utils: flatten_and_push_logs(..., time_partition=None)
    Ingest Utils->>JSON Utils: convert_array_to_object(..., time_partition=None)
    JSON Utils-->>Ingest Utils: Vec<Value>
    Ingest Utils-->>HTTP Ingest: Ok
    HTTP Ingest-->>Client: 200 OK
  end
Loading
sequenceDiagram
  participant Staging
  participant ObjStorage
  participant Concurrency Pool
  participant Storage Backend
  participant Catalog
  participant Stats

  Staging->>ObjStorage: process_parquet_files(stream, files)
  ObjStorage->>Concurrency Pool: spawn upload tasks (<=100)
  Concurrency Pool->>Storage Backend: upload file
  Concurrency Pool->>Catalog: build manifest for file
  Concurrency Pool->>Stats: calculate if enabled
  Concurrency Pool-->>ObjStorage: UploadResult (manifest, stats flag)
  ObjStorage->>Catalog: update_snapshot_with_manifests(Vec<manifest::File>)
  ObjStorage->>Stats: handle_stats_sync(if any)
  ObjStorage-->>Staging: cleanup staged files
Loading
sequenceDiagram
  participant Producer
  participant Catalog
  participant Partitioner

  Producer->>Catalog: update_snapshot(changes: Vec<File>)
  Catalog->>Partitioner: group_changes_by_partition
  Partitioner-->>Catalog: HashMap<partition, Vec<File>>
  loop partitions
    Catalog->>Catalog: process_single_partition(...)
  end
  Catalog-->>Producer: Ok
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Assessment against linked issues

Objective Addressed Explanation
Allow changing default time partition field for queries (historical data) [#1400] No changes to query layer or default time field selection; ingestion-side time partition allowed/blocked inconsistently; no UI/API to select query field.

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
Concurrent upload pipeline with UploadContext/UploadResult (src/storage/object_storage.rs) Not related to changing query time field; focuses on staging upload, manifests, and stats.
Staging writer rename strategy to avoid overwrites (src/parseable/staging/writer.rs) File collision handling is unrelated to query time field configuration.
ULID-based arrow file grouping and related test updates (src/parseable/streams.rs) Affects file grouping and tests; not tied to query-time field configurability.
Publicizing handler constants (src/handlers/mod.rs) Visibility changes for headers/keys are unrelated to selecting query time field.
Partition-aware JSON flattening behavior (src/utils/json/mod.rs) Ingestion transformation changes do not expose controls to set the default query time field.

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • de-sh
  • parmesant
  • nitisht

Poem

In burrows where the logs all stream,
I shuffle bytes with whiskered gleam—
Partitions marked, snapshots aligned,
Parallel hops, all neatly timed.
A ULID twitch, a schema tune—
Hoppity hooray, shipped by noon! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

create stream with custom time partition
use header -
X-P-Time-Partition: <field-name> - this has to be a timestamp field
X-P-Time-Partition-Limit: max historical range default to 30d

server validates if all events in the batch has this time partition field
if false, it rejects the whole batch
@nikhilsinhaparseable nikhilsinhaparseable marked this pull request as ready for review August 18, 2025 04:04
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (4)
src/handlers/http/ingest.rs (1)

256-270: OTEL ingestion should also pass effective time_partition

To uniformly enforce time-partition validation for OTEL, compute the same effective time-partition (header override or stream default) and pass it to flatten_and_push_logs.

-                flatten_and_push_logs(
-                    serde_json::from_slice(&body)?,
-                    stream_name,
-                    log_source,
-                    &p_custom_fields,
-                    None,
-                )
+                let time_partition = req
+                    .headers()
+                    .get("x-p-time-partition")
+                    .and_then(|h| h.to_str().ok())
+                    .map(|s| s.to_string())
+                    .or_else(|| PARSEABLE.get_stream(stream_name).ok()?.get_time_partition());
+
+                flatten_and_push_logs(
+                    serde_json::from_slice(&body)?,
+                    stream_name,
+                    log_source,
+                    &p_custom_fields,
+                    time_partition,
+                )
                 .await?;
src/handlers/http/modal/utils/ingest_utils.rs (1)

133-139: Regression: stream-configured time_partition no longer enforced when caller passes None

push_logs now only uses the passed time_partition and no longer derives it from the stream. If call sites forget to pass it (several currently pass None), time-partition validation silently won’t run. To preserve correctness:

  • Compute an effective time partition: per-request override if present, else stream-configured.
  • Use that for both convert_array_to_object and into_event.
 pub async fn push_logs(
     stream_name: &str,
     json: Value,
     log_source: &LogSource,
     p_custom_fields: &HashMap<String, String>,
-    time_partition: Option<String>,
+    time_partition: Option<String>,
 ) -> Result<(), PostError> {
     let stream = PARSEABLE.get_stream(stream_name)?;
     let time_partition_limit = PARSEABLE
         .get_stream(stream_name)?
         .get_time_partition_limit();
     let static_schema_flag = stream.get_static_schema_flag();
     let custom_partition = stream.get_custom_partition();
     let schema_version = stream.get_schema_version();
     let p_timestamp = Utc::now();
 
-    let data = convert_array_to_object(
+    // Prefer per-request override, else fall back to the stream-configured time partition
+    let effective_time_partition = time_partition.or_else(|| stream.get_time_partition());
+
+    let data = convert_array_to_object(
         json,
-        time_partition.as_ref(),
+        effective_time_partition.as_ref(),
         time_partition_limit,
         custom_partition.as_ref(),
         schema_version,
         log_source,
     )?;
 
     for json in data {
         let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
         let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
         json::Event { json, p_timestamp }
             .into_event(
                 stream_name.to_owned(),
                 origin_size,
                 &schema,
                 static_schema_flag,
                 custom_partition.as_ref(),
-                time_partition.as_ref(),
+                effective_time_partition.as_ref(),
                 schema_version,
                 StreamType::UserDefined,
                 p_custom_fields,
             )?
             .process()?;
     }
     Ok(())
 }

Also applies to: 149-156, 168-169

src/catalog/mod.rs (1)

90-109: Unsafe unwraps in get_file_bounds can crash the node

find(...).unwrap() and stats.as_ref().unwrap() will panic if:

  • the expected column is missing in a file, or
  • stats are absent (e.g., file with schema/metadata drift).

Prefer returning a handled error or skipping such files with a log. Minimal fix:

-fn get_file_bounds(
-    file: &manifest::File,
-    partition_column: String,
-) -> (DateTime<Utc>, DateTime<Utc>) {
-    match file
-        .columns()
-        .iter()
-        .find(|col| col.name == partition_column)
-        .unwrap()
-        .stats
-        .as_ref()
-        .unwrap()
-    {
+fn get_file_bounds(
+    file: &manifest::File,
+    partition_column: String,
+) -> (DateTime<Utc>, DateTime<Utc>) {
+    let col = match file.columns().iter().find(|col| col.name == partition_column) {
+        Some(c) => c,
+        None => {
+            error!("Partition column {partition_column} not found in manifest file {}", file.file_path);
+            // Fallback: treat as empty range to avoid crash; adjust if a different strategy is preferred
+            return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap());
+        }
+    };
+    let stats = match col.stats.as_ref() {
+        Some(s) => s,
+        None => {
+            error!("Missing stats for column {partition_column} in {}", file.file_path);
+            return (DateTime::from_timestamp_millis(0).unwrap(), DateTime::from_timestamp_millis(0).unwrap());
+        }
+    };
+    match stats {
         column::TypedStatistics::Int(stats) => (
             DateTime::from_timestamp_millis(stats.min).unwrap(),
             DateTime::from_timestamp_millis(stats.max).unwrap(),
         ),
         _ => unreachable!(),
     }
 }

If a hard error is preferred instead of a fallback, we can return a Result and propagate it.

src/utils/json/mod.rs (1)

195-217: Document wrapping of array inputs in convert_array_to_object

Call sites and tests confirm that when body is a JSON array (and no partitioning), the entire array is returned as a single Vec<Value>. Please update the docstring on src/utils/json/mod.rs (around the convert_array_to_object signature, ~line 219) to explicitly state:

  • For array inputs without partitioning, the function wraps the full array into a one-element Vec.

[src/utils/json/mod.rs:219]

🧹 Nitpick comments (14)
src/parseable/streams.rs (1)

1368-1371: Avoid println in tests; assert update is correct.

The updated assertion to expect 3 Arrow files is correct for the same-minute case before compaction. Replace the println with tracing to keep tests clean.

Apply this diff:

-        println!("arrow files: {:?}", staging.arrow_files());
+        tracing::debug!("arrow files: {:?}", staging.arrow_files());
src/parseable/mod.rs (1)

922-931: Custom partition validation limits to a single key.

Matches the documented constraint and simplifies downstream layout. Consider updating the error message to include the provided value for quicker debugging.

Apply this diff:

-        return Err(CreateStreamError::Custom {
-            msg: "Maximum 1 custom partition key is supported".to_string(),
+        return Err(CreateStreamError::Custom {
+            msg: format!("Maximum 1 custom partition key is supported (got: {custom_partition})"),
             status: StatusCode::BAD_REQUEST,
         });
src/storage/field_stats.rs (3)

180-181: Lowering log level to trace may hide actionable failures

These failures previously surfaced at warn!. Moving to trace! will make them easy to miss in production. Consider debug! with field context (stream/field) so operators can correlate silent drops.

- trace!("Failed to execute distinct stats query: {e}");
+ debug!("Failed to execute distinct stats query: {e}");

188-189: Same concern: batch fetch errors downgraded to trace

If the stream is noisy, this will fly under the radar. Suggest debug! and include stream_name/field_name to aid troubleshooting.

- trace!("Failed to fetch batch in distinct stats query: {e}");
+ debug!("Failed to fetch batch in distinct stats query: {e}");

216-217: Field-specific query failure should not be fully silent

trace! here likely hides intermittent schema/data issues. Recommend debug! (or keep warn!) to retain visibility.

- trace!("Failed to execute distinct stats query for field: {field_name}, error: {e}");
+ debug!("Failed to execute distinct stats query for field: {field_name}, error: {e}");
src/handlers/http/ingest.rs (1)

515-545: HTTP status for CustomError

PostError::CustomError maps to 500. For client-side misuse (e.g., bad header combo), 400 is more appropriate. Consider a dedicated variant (e.g., TimePartitionIngestionError) mapped to 400 instead of reusing CustomError.

src/catalog/mod.rs (1)

130-158: Parallel grouping is fine; consider determinism only if needed

Using Rayon to group by partition is fine. If deterministic processing order ever matters, add a final sort of keys before iteration. Not required right now.

src/storage/object_storage.rs (2)

972-999: Minor: spawn_parquet_upload_task doesn’t need to be async

This function only schedules a task; making it fn avoids an unnecessary .await.

-async fn spawn_parquet_upload_task(
+fn spawn_parquet_upload_task(
     join_set: &mut JoinSet<Result<UploadResult, ObjectStorageError>>,
@@
-) {
+) {
@@
-    join_set.spawn(async move {
+    join_set.spawn(async move {
         let _permit = semaphore.acquire().await.expect("semaphore is not closed");
 
         upload_single_parquet_file(store, path, stream_relative_path, stream_name, schema).await
     });
-}
+}

1001-1044: Result collection strategy is fine; consider best-effort behavior on single-file errors

Currently, the first per-file error aborts the entire sync with an error, potentially leaving other tasks running until JoinSet is dropped. If you prefer best-effort uploads, collect all successes and log failures, then proceed to snapshot update. Optional.

src/utils/json/mod.rs (5)

66-75: De-duplicate gating logic: reuse this helper in flatten_json_body for consistency

You’ve introduced should_apply_generic_flattening() but flatten_json_body re-implements the same condition inline. Using the helper there avoids drift and keeps behavior consistent across partitioned and non-partitioned paths.

Outside this hunk, consider updating flatten_json_body as:

let mut nested_value = if should_apply_generic_flattening(&body, schema_version, log_source) {
    let flattened_json = generic_flattening(&body)?;
    convert_to_array(flattened_json)?
} else {
    body
};

77-115: Simplify: always iterate the flattened results (remove special-casing len == 1)

Both branches call flatten::flatten on each resulting item; the only difference is avoiding a tiny loop for len == 1. Unify to reduce code and potential divergence.

Apply this diff:

 fn apply_generic_flattening_for_partition(
     element: Value,
     time_partition: Option<&String>,
     time_partition_limit: Option<NonZeroU32>,
     custom_partition: Option<&String>,
 ) -> Result<Vec<Value>, anyhow::Error> {
-    let flattened_json = generic_flattening(&element)?;
-
-    if flattened_json.len() == 1 {
-        // Single result - process normally
-        let mut nested_value = flattened_json.into_iter().next().unwrap();
-        flatten::flatten(
-            &mut nested_value,
-            "_",
-            time_partition,
-            time_partition_limit,
-            custom_partition,
-            true,
-        )?;
-        Ok(vec![nested_value])
-    } else {
-        // Multiple results - process each individually
-        let mut result = Vec::new();
-        for item in flattened_json {
-            let mut processed_item = item;
-            flatten::flatten(
-                &mut processed_item,
-                "_",
-                time_partition,
-                time_partition_limit,
-                custom_partition,
-                true,
-            )?;
-            result.push(processed_item);
-        }
-        Ok(result)
-    }
+    let flattened_json = generic_flattening(&element)?;
+    let mut result = Vec::with_capacity(flattened_json.len());
+    for mut item in flattened_json {
+        flatten::flatten(
+            &mut item,
+            "_",
+            time_partition,
+            time_partition_limit,
+            custom_partition,
+            true,
+        )?;
+        result.push(item);
+    }
+    Ok(result)
 }

149-171: Pre-allocate results; consider parallelizing for large batches

The loop is fine. Two small options:

  • Pre-allocate: result.reserve(arr.len()) as a lower bound.
  • Optional future: use rayon to parallelize per-element processing if CPU-bound and safe (PR mentions rayon was added elsewhere).

Minimal pre-allocation diff:

 fn process_partitioned_array(
@@
 ) -> Result<Vec<Value>, anyhow::Error> {
-    let mut result = Vec::new();
+    let mut result = Vec::new();
+    result.reserve(arr.len()); // lower bound; items can expand

If/when you choose to parallelize, you can adopt rayon and then flatten:

// Outside this hunk, add once: use rayon::prelude::*;
let chunks: Result<Vec<Vec<Value>>, _> = arr
    .into_par_iter()
    .map(|element| {
        process_partitioned_element(
            element,
            time_partition,
            time_partition_limit,
            custom_partition,
            schema_version,
            log_source,
        )
    })
    .collect();

let result = chunks?.into_iter().flatten().collect();

483-515: Add a negative test: missing time-partition field should reject the whole batch

Given the PR objective (reject batch if any event lacks the partition field), add a test where one element omits source_time and assert that convert_array_to_object returns Err.

Add alongside this test:

#[test]
fn test_convert_array_to_object_with_time_partition_missing_field_rejects_batch() {
    let json = json!([
        { "a": "b", "source_time": "2025-08-01T00:00:00.000Z" },
        { "a": "b" } // missing required partition field
    ]);

    let time_partition = Some("source_time".to_string());
    let result = convert_array_to_object(
        json,
        time_partition.as_ref(),
        None,
        None,
        SchemaVersion::V0,
        &crate::event::format::LogSource::default(),
    );

    assert!(result.is_err(), "Batch should be rejected if any event lacks time partition field");
}

517-545: Consider adding a SchemaVersion::V1 case to exercise generic flattening gating

This test validates the new non-partitioned batching, but only for V0. A small V1 case with shallow nesting would exercise should_apply_generic_flattening and guard against regressions in the gating condition.

Example to add near this test:

#[test]
fn test_non_partitioned_v1_generic_flattening_applies() {
    // Single object with an array to be expanded by generic_flattening
    let json = json!({
        "id": 1,
        "items": [ {"x": 10}, {"x": 20} ]
    });

    let result = convert_array_to_object(
        json.clone(),
        None,
        None,
        None,
        SchemaVersion::V1,
        &crate::event::format::LogSource::Json, // matches gating
    );

    assert!(result.is_ok());
    let out = result.unwrap();
    assert_eq!(out.len(), 1, "Non-partitioned path should still return a single batch item");
    // And the single batch item should be an array with expanded objects
    // depending on flatten_json_body's V1 behavior.
    // You can assert shape here based on expected expansion semantics.
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 08bece6 and 95aacf6.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • Cargo.toml (1 hunks)
  • src/catalog/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (3 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (4 hunks)
  • src/handlers/mod.rs (1 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/parseable/staging/writer.rs (2 hunks)
  • src/parseable/streams.rs (2 hunks)
  • src/storage/field_stats.rs (4 hunks)
  • src/storage/object_storage.rs (5 hunks)
  • src/utils/json/mod.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-03-26T06:44:53.362Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/ingest.rs
📚 Learning: 2025-06-16T09:50:38.636Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1346
File: src/parseable/streams.rs:319-331
Timestamp: 2025-06-16T09:50:38.636Z
Learning: In Parseable's Ingest or Query mode, the node_id is always available because it's generated during server initialization itself, before the get_node_id_string() function in streams.rs would be called. This makes the .expect() calls on QUERIER_META.get() and INGESTOR_META.get() safe in this context.

Applied to files:

  • src/parseable/streams.rs
🧬 Code Graph Analysis (6)
src/utils/json/mod.rs (4)
src/utils/json/flatten.rs (4)
  • has_more_than_max_allowed_levels (335-348)
  • custom_partition (104-104)
  • generic_flattening (269-328)
  • flatten (58-93)
src/parseable/mod.rs (3)
  • custom_partition (751-751)
  • custom_partition (923-923)
  • new (146-158)
src/storage/mod.rs (3)
  • new (192-194)
  • new (205-211)
  • default (215-236)
src/event/format/mod.rs (1)
  • new (126-131)
src/storage/object_storage.rs (5)
src/parseable/streams.rs (1)
  • new (114-131)
src/parseable/mod.rs (3)
  • new (146-158)
  • serde_json (300-300)
  • serde_json (306-306)
src/catalog/manifest.rs (1)
  • create_from_parquet_file (91-124)
src/storage/field_stats.rs (1)
  • calculate_field_stats (79-130)
src/catalog/mod.rs (1)
  • update_snapshot (111-128)
src/handlers/http/modal/utils/ingest_utils.rs (2)
src/utils/json/mod.rs (1)
  • convert_array_to_object (219-256)
src/static_schema.rs (1)
  • custom_partition (73-73)
src/handlers/http/ingest.rs (1)
src/handlers/http/modal/utils/ingest_utils.rs (1)
  • flatten_and_push_logs (51-131)
src/parseable/mod.rs (1)
src/static_schema.rs (1)
  • custom_partition (73-73)
src/catalog/mod.rs (3)
src/storage/object_storage.rs (2)
  • new (83-92)
  • manifest_path (1216-1236)
src/stats.rs (4)
  • event_labels (218-220)
  • event_labels_date (226-232)
  • storage_size_labels_date (234-236)
  • get_current_stats (51-109)
src/catalog/manifest.rs (1)
  • default (69-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (16)
Cargo.toml (1)

126-126: Rayon dependency addition looks good.

No immediate compatibility or security concerns. Keep an eye on binary size and parallelism defaults where you adopt Rayon to avoid oversubscription (Tokio + Rayon).

If you intend to gate Rayon-backed paths, consider making it an optional feature to avoid pulling it into minimal deployments. Want me to draft a feature-gated dependency stanza?

src/parseable/streams.rs (1)

276-276: ULID-based grouping token: sensible choice for collision-free parquet names.

Using a per-run ULID avoids cross-run collisions and keeps grouping deterministic for a single pass.

src/parseable/mod.rs (4)

538-544: Mutual exclusivity between time and custom partitions: good guardrail.

This aligns with the new semantics and avoids ambiguous layouts. Keep it.

Confirm that the ingestion paths reject payloads that contain both fields when stream metadata already sets one (including updates). If needed, I can scan the codebase and propose harmonized checks at ingestion boundaries.


871-900: Static schema validation path is correct.

Passing time_partition and custom_partition through to schema conversion ensures early validation of field presence for static schema streams.


902-920: Time-partition limit parsing covers format and zero checks.

Error messages are clear and actionable. No further changes needed.


531-576: Double-check end-to-end enforcement that every event carries the specified time-partition field.

The PR states batches should be rejected if any event lacks the specified time-partition field. This file handles metadata and creation, but not the ingest-time validation.

Would you like me to verify and, if needed, add a guard in the ingest pipeline (e.g., in flatten_and_push_logs/push_logs) to enforce this invariant at batch boundaries?

src/handlers/mod.rs (1)

29-35: Making header constants public is appropriate.

This enables cross-module access to time-partition and related headers without duplication. Keep the constants centralized here.

src/storage/field_stats.rs (1)

126-127: Signature sync with new time_partition param looks correct

Passing None into flatten_and_push_logs aligns with the updated signature and preserves the previous behavior for dataset-stats.

src/handlers/http/ingest.rs (3)

118-133: Good: explicit stream retrieval and error propagation

Matching on PARSEABLE.get_stream and propagating failures removes the prior ambiguity. The OTEL format compatibility check is preserved.


410-425: Same issue in post_event: hard block + not passing time_partition

Mirror the fix applied in ingest: remove the block and pass effective time partition to flatten_and_push_logs.

-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }
-
-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    let time_partition = req
+        .headers()
+        .get("x-p-time-partition")
+        .and_then(|h| h.to_str().ok())
+        .map(|s| s.to_string())
+        .or_else(|| stream.get_time_partition());
+
+    flatten_and_push_logs(
+        json,
+        &stream_name,
+        &log_source,
+        &p_custom_fields,
+        time_partition,
+    )
+    .await?;
⛔ Skipped due to learnings
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1263
File: src/handlers/http/ingest.rs:300-310
Timestamp: 2025-03-26T06:44:53.362Z
Learning: In Parseable, every stream is always associated with a log_source - no stream can exist without a log_source. For otel-traces and otel-metrics, strict restrictions are implemented where ingestion is rejected if a stream already has a different log_source format. However, regular logs from multiple log_sources can coexist in a single stream.
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

55-67: Time-partition headers are already handled at stream creation
The ingest handler pulls the stream’s time_partition and time_partition_limit from its metadata (via PARSEABLE.get_stream…get_time_partition_limit()) and passes them into flatten_and_push_logs for validation. The raw X-P-Time-Partition and X-P-Time-Partition-Limit headers are parsed when the stream is created or updated in

  • src/handlers/http/modal/utils/logstream_utils.rs (see TIME_PARTITION_KEY / TIME_PARTITION_LIMIT_KEY in PutStreamHeaders)

No changes to the ingest endpoint are required.

src/handlers/http/modal/utils/ingest_utils.rs (1)

51-57: Public API extension is sensible

Adding time_partition: Option<String> to flatten_and_push_logs keeps backward compatibility and enables per-request overrides.

src/catalog/mod.rs (1)

111-128: Batch-oriented snapshot update: good structure and early return

Switching to a Vec of changes and early exit is clean and unlocks per-partition processing.

src/storage/object_storage.rs (1)

75-93: UploadContext is a good encapsulation

The context wrapper simplifies passing stream-derived info across the pipeline.

src/utils/json/mod.rs (2)

126-145: Per-element generic-flatten gating may yield mixed shapes within a single batch

In partitioned mode, generic_flattening is decided per element, whereas the non-partitioned path gates on the whole body. This can produce heterogeneous outputs (some events expanded via generic flattening while others aren’t) within the same batch if depth varies across elements. If downstream expects a uniform schema per batch, this can be surprising.

Please confirm that this behavioral change is intentional and acceptable for consumers. If uniformity is required, consider evaluating the gating once for the entire array and applying the same policy to all elements in that batch.


219-256: Routing logic LGTM

The partition/non-partition routing is clear, and validation is always enabled in the partitioned path. This aligns with rejecting the entire batch when a single event violates partition requirements.

Comment on lines +292 to +354
async fn handle_existing_partition(
pos: usize,
partition_changes: Vec<manifest::File>,
manifests: &mut [snapshot::ManifestItem],
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
events_ingested: u64,
ingestion_size: u64,
storage_size: u64,
partition_lower: DateTime<Utc>,
) -> Result<Option<snapshot::ManifestItem>, ObjectStorageError> {
let path = partition_path(
stream_name,
manifests[pos].time_lower_bound,
manifests[pos].time_upper_bound,
);

let manifest_file_name = manifest_path("").to_string();
let should_update = manifests[pos].manifest_path.contains(&manifest_file_name);

if should_update {
if let Some(mut manifest) = storage.get_manifest(&path).await? {
// Update existing manifest
for change in partition_changes {
manifest.apply_change(change);
}
storage.put_manifest(&path, manifest).await?;

manifests[pos].events_ingested = events_ingested;
manifests[pos].ingestion_size = ingestion_size;
manifests[pos].storage_size = storage_size;
Ok(None)
} else {
// Manifest not found, create new one
create_manifest(
lower_bound,
change,
storage.clone(),
partition_lower,
partition_changes,
storage,
stream_name,
true,
meta,
false,
ObjectStoreFormat::default(), // We don't have meta here, use default
events_ingested,
ingestion_size,
storage_size,
)
.await?;
.await
}
} else {
// Create new manifest for different partition
create_manifest(
lower_bound,
change,
storage.clone(),
partition_lower,
partition_changes,
storage,
stream_name,
true,
meta,
false,
ObjectStoreFormat::default(),
events_ingested,
ingestion_size,
storage_size,
)
.await?;
.await
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Wrong meta used when creating a manifest on the “existing partition but missing manifest” path

When an existing partition’s manifest file is missing, handle_existing_partition calls create_manifest with ObjectStoreFormat::default(). This loses stream-specific settings (e.g., time_partition), which can lead to incorrect first_event_at.

Pass the real meta instead.

-async fn handle_existing_partition(
+async fn handle_existing_partition(
     pos: usize,
     partition_changes: Vec<manifest::File>,
     manifests: &mut [snapshot::ManifestItem],
     storage: Arc<dyn ObjectStorage>,
     stream_name: &str,
     events_ingested: u64,
     ingestion_size: u64,
     storage_size: u64,
     partition_lower: DateTime<Utc>,
-) -> Result<Option<snapshot::ManifestItem>, ObjectStorageError> {
+    meta: &ObjectStoreFormat,
+) -> Result<Option<snapshot::ManifestItem>, ObjectStorageError> {
@@
-        } else {
+        } else {
             // Manifest not found, create new one
             create_manifest(
                 partition_lower,
                 partition_changes,
                 storage,
                 stream_name,
                 false,
-                ObjectStoreFormat::default(), // We don't have meta here, use default
+                meta.clone(),
                 events_ingested,
                 ingestion_size,
                 storage_size,
             )
             .await
         }
@@
-    } else {
+    } else {
         // Create new manifest for different partition
         create_manifest(
             partition_lower,
             partition_changes,
             storage,
             stream_name,
             false,
-            ObjectStoreFormat::default(),
+            meta.clone(),
             events_ingested,
             ingestion_size,
             storage_size,
         )
         .await
     }

And pass meta from process_single_partition:

-        handle_existing_partition(
+        handle_existing_partition(
             pos,
             partition_changes,
             &mut meta.snapshot.manifest_list,
             storage,
             stream_name,
             events_ingested,
             ingestion_size,
             storage_size,
             partition_lower,
+            meta,
         )

Also applies to: 246-288

🤖 Prompt for AI Agents
In src/catalog/mod.rs around lines 292 to 354 (and similarly at 246 to 288), the
code creates a new manifest using ObjectStoreFormat::default() when an existing
partition’s manifest is missing, which loses stream-specific settings; change
handle_existing_partition to accept the real stream meta (add a meta parameter)
and replace the ObjectStoreFormat::default() arguments with that meta, then
update the callers (notably process_single_partition) to pass the stream’s meta
through to handle_existing_partition so created manifests use the correct stream
settings.

Comment on lines +139 to 146
if stream.get_time_partition().is_some() {
return Err(PostError::CustomError(
"Ingestion is not allowed to stream with time partition".to_string(),
));
}

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Blocking ingestion for time-partitioned streams contradicts the PR objective and is inconsistent across endpoints

Rejecting ingestion when stream.get_time_partition().is_some() prevents ingesting into time-partitioned datasets at all via this endpoint. The PR aims to enable time-partitioned ingestion and to validate presence of the field; enforcement should happen during flatten/convert, not by hard-blocking the endpoint. Also note: OTEL paths currently don’t apply this block, leading to inconsistent behavior.

Action:

  • Remove the hard block.
  • Pass an effective time_partition to flatten_and_push_logs (prefer header x-p-time-partition if provided; otherwise fall back to stream-configured time_partition). This allows per-request override while keeping stream defaults intact.
-    if stream.get_time_partition().is_some() {
-        return Err(PostError::CustomError(
-            "Ingestion is not allowed to stream with time partition".to_string(),
-        ));
-    }
-
-    flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
+    // Prefer header override, else fall back to stream-configured time partition
+    let time_partition = req
+        .headers()
+        .get("x-p-time-partition")
+        .and_then(|h| h.to_str().ok())
+        .map(|s| s.to_string())
+        .or_else(|| stream.get_time_partition());
+
+    flatten_and_push_logs(
+        json,
+        &stream_name,
+        &log_source,
+        &p_custom_fields,
+        time_partition,
+    )
+    .await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if stream.get_time_partition().is_some() {
return Err(PostError::CustomError(
"Ingestion is not allowed to stream with time partition".to_string(),
));
}
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
// Prefer header override, else fall back to stream-configured time partition
let time_partition = req
.headers()
.get("x-p-time-partition")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string())
.or_else(|| stream.get_time_partition());
flatten_and_push_logs(
json,
&stream_name,
&log_source,
&p_custom_fields,
time_partition,
)
.await?;
🤖 Prompt for AI Agents
In src/handlers/http/ingest.rs around lines 139 to 146, remove the hard block
that returns an error when stream.get_time_partition().is_some(); instead
compute an effective time_partition by checking the request header
"x-p-time-partition" first and, if absent, falling back to
stream.get_time_partition(), then pass that effective time_partition into the
call to flatten_and_push_logs so validation/enforcement happens during
flatten/convert (maintain existing Option type and await behavior).

Comment on lines +528 to 533
let time_partition_in_days = if !time_partition_limit.is_empty() {
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
None
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enforce the documented default time-partition limit (30d) when header is absent.

Per PR description, X-P-Time-Partition-Limit should default to 30d. The current logic sets None when the limit header is empty, which skips the default.

Apply this diff:

-        let time_partition_in_days = if !time_partition_limit.is_empty() {
-            Some(validate_time_partition_limit(&time_partition_limit)?)
-        } else {
-            None
-        };
+        // Default to 30d when a time partition is requested without an explicit limit.
+        let time_partition_in_days = if !time_partition.is_empty() {
+            if !time_partition_limit.is_empty() {
+                Some(validate_time_partition_limit(&time_partition_limit)?)
+            } else {
+                Some(std::num::NonZeroU32::new(30).expect("30 is non-zero"))
+            }
+        } else {
+            None
+        };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let time_partition_in_days = if !time_partition_limit.is_empty() {
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
None
};
// Default to 30d when a time partition is requested without an explicit limit.
let time_partition_in_days = if !time_partition.is_empty() {
if !time_partition_limit.is_empty() {
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
Some(std::num::NonZeroU32::new(30).expect("30 is non-zero"))
}
} else {
None
};
🤖 Prompt for AI Agents
In src/parseable/mod.rs around lines 528 to 533, the code returns None when the
X-P-Time-Partition-Limit header is empty, but the PR requires a default of 30d;
instead, when the header is empty use the literal "30d" as the
time_partition_limit and pass that through validate_time_partition_limit,
returning Some(validated_value) so the default is enforced rather than skipping
validation.

Comment on lines +96 to 105
// If file exists, append a random string before .date to avoid overwriting
if arrow_path.exists() {
warn!("File {arrow_path:?} exists and will be overwritten");
let file_name = arrow_path.file_name().unwrap().to_string_lossy();
let date_pos = file_name
.find(".date")
.expect("File name should contain .date");
let random_suffix = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
let new_name = format!("{}{}", random_suffix, &file_name[date_pos..]);
arrow_path.set_file_name(new_name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Do not panic in Drop; preserve the stream hash when avoiding collisions.

  • Panicking on Line 101 if ".date" is missing can abort the process during drop.
  • Current logic drops the original stream-hash prefix by replacing it with the random token, which weakens naming consistency.

Make Drop robust and insert the random token before ".date" while preserving the original prefix. Fall back to simple suffixing if the expected pattern is missing.

Apply this diff:

-        // If file exists, append a random string before .date to avoid overwriting
-        if arrow_path.exists() {
-            let file_name = arrow_path.file_name().unwrap().to_string_lossy();
-            let date_pos = file_name
-                .find(".date")
-                .expect("File name should contain .date");
-            let random_suffix = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
-            let new_name = format!("{}{}", random_suffix, &file_name[date_pos..]);
-            arrow_path.set_file_name(new_name);
-        }
+        // If the target exists, pick a unique target by inserting a random token before `.date`.
+        // Never panic in Drop; fall back to simple suffixing if the pattern is unexpected.
+        if arrow_path.exists() {
+            if let Some(file_name) = arrow_path.file_name().and_then(|s| s.to_str()) {
+                let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
+                if let Some(date_pos) = file_name.find(".date") {
+                    // Preserve the original prefix (e.g., stream hash) and insert random token.
+                    let new_name =
+                        format!("{}.{random}{}", &file_name[..date_pos], &file_name[date_pos..]);
+                    arrow_path.set_file_name(new_name);
+                } else {
+                    // Unexpected name; just append the random token.
+                    let new_name = format!("{file_name}.{random}");
+                    arrow_path.set_file_name(new_name);
+                }
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If file exists, append a random string before .date to avoid overwriting
if arrow_path.exists() {
warn!("File {arrow_path:?} exists and will be overwritten");
let file_name = arrow_path.file_name().unwrap().to_string_lossy();
let date_pos = file_name
.find(".date")
.expect("File name should contain .date");
let random_suffix = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
let new_name = format!("{}{}", random_suffix, &file_name[date_pos..]);
arrow_path.set_file_name(new_name);
}
// If the target exists, pick a unique target by inserting a random token before `.date`.
// Never panic in Drop; fall back to simple suffixing if the pattern is unexpected.
if arrow_path.exists() {
if let Some(file_name) = arrow_path.file_name().and_then(|s| s.to_str()) {
let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
if let Some(date_pos) = file_name.find(".date") {
// Preserve the original prefix (e.g., stream hash) and insert random token.
let new_name =
format!("{}.{random}{}", &file_name[..date_pos], &file_name[date_pos..]);
arrow_path.set_file_name(new_name);
} else {
// Unexpected name; just append the random token.
let new_name = format!("{file_name}.{random}");
arrow_path.set_file_name(new_name);
}
}
}

Comment on lines +102 to +147
/// Handles the upload of a single parquet file
async fn upload_single_parquet_file(
store: Arc<dyn ObjectStorage>,
path: std::path::PathBuf,
stream_relative_path: String,
stream_name: String,
schema: Arc<Schema>,
) -> Result<UploadResult, ObjectStorageError> {
let filename = path
.file_name()
.expect("only parquet files are returned by iterator")
.to_str()
.expect("filename is valid string");

// Upload the file
if let Err(e) = store
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
.await
{
error!("Failed to upload file {filename:?}: {e}");
return Ok(UploadResult {
stats_calculated: false,
file_path: path,
manifest_file: None,
});
}

// Update storage metrics
update_storage_metrics(&path, &stream_name, filename)?;

// Create manifest entry
let absolute_path = store
.absolute_url(RelativePath::from_path(&stream_relative_path).expect("valid relative path"))
.to_string();

let manifest = catalog::create_from_parquet_file(absolute_path, &path)?;

// Calculate field stats if enabled
let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await;

Ok(UploadResult {
stats_calculated,
file_path: path,
manifest_file: Some(manifest),
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Plumb stream-relative path into metrics updater to avoid brittle filename parsing

Current metrics update derives date from filename using fixed splits which can panic if the naming deviates. Instead, use the already-built stream_relative_path where the date= segment is explicit.

-    // Update storage metrics
-    update_storage_metrics(&path, &stream_name, filename)?;
+    // Update storage metrics
+    update_storage_metrics(&path, &stream_name, &stream_relative_path)?;

And update the helper (see next comment).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Handles the upload of a single parquet file
async fn upload_single_parquet_file(
store: Arc<dyn ObjectStorage>,
path: std::path::PathBuf,
stream_relative_path: String,
stream_name: String,
schema: Arc<Schema>,
) -> Result<UploadResult, ObjectStorageError> {
let filename = path
.file_name()
.expect("only parquet files are returned by iterator")
.to_str()
.expect("filename is valid string");
// Upload the file
if let Err(e) = store
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
.await
{
error!("Failed to upload file {filename:?}: {e}");
return Ok(UploadResult {
stats_calculated: false,
file_path: path,
manifest_file: None,
});
}
// Update storage metrics
update_storage_metrics(&path, &stream_name, filename)?;
// Create manifest entry
let absolute_path = store
.absolute_url(RelativePath::from_path(&stream_relative_path).expect("valid relative path"))
.to_string();
let manifest = catalog::create_from_parquet_file(absolute_path, &path)?;
// Calculate field stats if enabled
let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await;
Ok(UploadResult {
stats_calculated,
file_path: path,
manifest_file: Some(manifest),
})
}
// Upload the file
if let Err(e) = store
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
.await
{
error!("Failed to upload file {filename:?}: {e}");
return Ok(UploadResult {
stats_calculated: false,
file_path: path,
manifest_file: None,
});
}
// Update storage metrics
update_storage_metrics(&path, &stream_name, &stream_relative_path)?;
// Create manifest entry
let absolute_path = store
.absolute_url(RelativePath::from_path(&stream_relative_path).expect("valid relative path"))
.to_string();
let manifest = catalog::create_from_parquet_file(absolute_path, &path)?;
// Calculate field stats if enabled
let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await;
Ok(UploadResult {
stats_calculated,
file_path: path,
manifest_file: Some(manifest),
})
}
🤖 Prompt for AI Agents
In src/storage/object_storage.rs around lines 102 to 147, the call to
update_storage_metrics currently passes filename which forces the helper to
parse the date via brittle filename splits; instead pass the already-constructed
stream_relative_path so the helper can extract the explicit "date=" segment
reliably. Modify this call to provide &stream_relative_path (and adjust the
update_storage_metrics signature and its internal parsing to extract date from
the relative path's "date=" segment), and update any other call sites of
update_storage_metrics to match the new signature.

Comment on lines +149 to +170
/// Updates storage-related metrics for an uploaded file
fn update_storage_metrics(
path: &std::path::Path,
stream_name: &str,
filename: &str,
) -> Result<(), ObjectStorageError> {
let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
let compressed_size = path.metadata().map_or(0, |meta| meta.len());

STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", file_date_part])
.add(compressed_size as i64);
LIFETIME_EVENTS_STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Brittle date extraction can panic; parse date= from the stream-relative path instead

Guard against malformed names and avoid indexing into split results.

-fn update_storage_metrics(
-    path: &std::path::Path,
-    stream_name: &str,
-    filename: &str,
-) -> Result<(), ObjectStorageError> {
-    let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
-    file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
+fn update_storage_metrics(
+    path: &std::path::Path,
+    stream_name: &str,
+    stream_relative_path: &str,
+) -> Result<(), ObjectStorageError> {
+    // Expect a "date=YYYY-MM-DD" segment in the relative path
+    let maybe_date = stream_relative_path
+        .split('/')
+        .find_map(|seg| seg.strip_prefix("date="));
+    let Some(file_date_part) = maybe_date else {
+        warn!("Could not extract date from path {stream_relative_path}; skipping date-based metrics update");
+        return Ok(());
+    };
     let compressed_size = path.metadata().map_or(0, |meta| meta.len());
 
     STORAGE_SIZE
         .with_label_values(&["data", stream_name, "parquet"])
         .add(compressed_size as i64);
     EVENTS_STORAGE_SIZE_DATE
-        .with_label_values(&["data", stream_name, "parquet", file_date_part])
+        .with_label_values(&["data", stream_name, "parquet", file_date_part])
         .add(compressed_size as i64);
     LIFETIME_EVENTS_STORAGE_SIZE
         .with_label_values(&["data", stream_name, "parquet"])
         .add(compressed_size as i64);
 
     Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Updates storage-related metrics for an uploaded file
fn update_storage_metrics(
path: &std::path::Path,
stream_name: &str,
filename: &str,
) -> Result<(), ObjectStorageError> {
let mut file_date_part = filename.split('.').collect::<Vec<&str>>()[0];
file_date_part = file_date_part.split('=').collect::<Vec<&str>>()[1];
let compressed_size = path.metadata().map_or(0, |meta| meta.len());
STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", file_date_part])
.add(compressed_size as i64);
LIFETIME_EVENTS_STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);
Ok(())
}
/// Updates storage-related metrics for an uploaded file
fn update_storage_metrics(
path: &std::path::Path,
stream_name: &str,
stream_relative_path: &str,
) -> Result<(), ObjectStorageError> {
// Expect a "date=YYYY-MM-DD" segment in the relative path
let maybe_date = stream_relative_path
.split('/')
.find_map(|seg| seg.strip_prefix("date="));
let Some(file_date_part) = maybe_date else {
warn!(
"Could not extract date from path {stream_relative_path}; \
skipping date-based metrics update"
);
return Ok(());
};
let compressed_size = path.metadata().map_or(0, |meta| meta.len());
STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", file_date_part])
.add(compressed_size as i64);
LIFETIME_EVENTS_STORAGE_SIZE
.with_label_values(&["data", stream_name, "parquet"])
.add(compressed_size as i64);
Ok(())
}
🤖 Prompt for AI Agents
In src/storage/object_storage.rs around lines 149 to 170, the current filename
parsing uses fragile indexing on split results and can panic for malformed
names; replace that logic by locating the "date=" segment in the stream-relative
path safely (e.g., use split_once or find to locate "date=" and then extract
until the next separator or dot), validate that you actually found a date token
and return an Err(ObjectStorageError) when it is missing or malformed instead of
indexing into vectors, and ensure compressed_size retrieval remains unchanged;
this makes extraction defensive and prevents panics on unexpected file names.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Possibility to change the default time partition field to query over historical data
1 participant